package rocketmq

import (
	"context"
	"gitee.com/yaodae/server-lib-go/common"
	"gitee.com/yaodae/server-lib-go/log"
	mqHttpSdk "github.com/aliyunmq/mq-http-go-sdk"
	"github.com/gogap/errors"
	"strings"
	"time"
)

type MqWorker struct {
	ctx           context.Context
	topic         string
	numOfMessages int32
	waitSeconds   int64
	consumeFunc   func(topic string, msg MqMsg) *common.LResult
	stop          func()
}

func NewMqWorker(ctx context.Context, topic string, numOfMessages int32, waitSeconds int64, consumeFunc func(topic string, msg MqMsg) *common.LResult) *MqWorker {
	w := &MqWorker{topic: topic, numOfMessages: numOfMessages, waitSeconds: waitSeconds, consumeFunc: consumeFunc}
	// 使用WithCancel派生一个可被取消的ctx，用来控制后台协程。
	w.ctx, w.stop = context.WithCancel(ctx)
	return w
}

func (w *MqWorker) Run() {
	go func() {
		for {
			select {
			case <-w.ctx.Done():
				log.Debug("w.ctx.Done")
				return
			default:
				w.loop()
				log.Debug("w.loop restart")
				// f.loop() 在正常运行时errch是阻塞状态，如果
				// 出错了才有数据，此时select会被唤起，并重新
				// 启动 loop()，实现panic后自动恢复。
			}
		}
	}()
}

func (w *MqWorker) loop() {
	log.Debug("w.loop start")
	cli := GetConsumer(w.topic)
	for {
		endChan := make(chan int)
		respChan := make(chan mqHttpSdk.ConsumeMessageResponse)
		errChan := make(chan error)
		go func() {
			defer func() {
				if r := recover(); r != nil {
					log.Error("捕获到的错误：%s\n", r)
					endChan <- 1
				}
			}()
			select {
			case resp := <-respChan:
				{
					// 处理业务逻辑
					var handles []string
					if DEBUG {
						log.Debug("Consume %d messages---->\n", len(resp.Messages))
					}
					for _, v := range resp.Messages {
						if DEBUG {
							log.Debug("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
								"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
								"\tBody: %s\n"+
								"\tProps: %s\n",
								v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
								v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
						}
						msg := MqMsg{MessageBody: v.MessageBody, MessageTag: v.MessageTag,
							MessageKey: v.MessageKey, Properties: v.Properties}
						result := w.consumeFunc(w.topic, msg)
						if common.TestFail(result) {
							log.Error("rocketmq ConsumeMsg error=========" + common.ConvertJson(result))
						} else {
							handles = append(handles, v.ReceiptHandle)
						}
					}
					// NextConsumeTime前若不确认消息消费成功，则消息会重复消费
					// 消息句柄有时间戳，同一条消息每次消费拿到的都不一样
					ackErr := cli.AckMessage(handles)
					if ackErr != nil {
						// 某些消息的句柄可能超时了会导致确认不成功
						log.Error("rocketmq AckMessage error=========" + ackErr.Error())
						for _, errAckItem := range ackErr.(errors.ErrCode).Context()["Detail"].([]mqHttpSdk.ErrAckItem) {
							log.Info("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
								errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
						}
						time.Sleep(time.Duration(3) * time.Second)
					} else {
						if DEBUG {
							log.Debug("Ack ---->\n\t%s\n", handles)
						}
					}
					endChan <- 1
				}
			case err := <-errChan:
				{
					// 没有消息
					if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
						if DEBUG {
							log.Debug("\nNo new message, continue!")
						}
					} else {
						log.Error("rocketmq errChan error=========" + err.Error())
						time.Sleep(time.Duration(3) * time.Second)
					}
					endChan <- 1
				}
			case <-time.After(35 * time.Second):
				{
					log.Info("Timeout of consumer message ??")
					endChan <- 1
				}
			}
		}()
		// 长轮询消费消息
		// 长轮询表示如果topic没有消息则请求会在服务端挂住3s，3s内如果有消息可以消费则立即返回
		cli.ConsumeMessage(respChan, errChan,
			w.numOfMessages, // 一次最多消费多少条(最多可设置为16条)
			w.waitSeconds,   // 长轮询时间多少秒（最多可设置为30秒）
		)
		<-endChan
	}
}

func (w *MqWorker) Stop() {
	if w.stop != nil {
		w.stop()
	}
}
